Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10542] [PYSPARK] fix serialize namedtuple #8707

Closed
wants to merge 4 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Sep 10, 2015

No description provided.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42289 has finished for PR 8707 at commit d7ef6ce.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42293 has finished for PR 8707 at commit 6b9095b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42295 has finished for PR 8707 at commit 1d766aa.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42298 has finished for PR 8707 at commit a2f9f36.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #1739 has finished for PR 8707 at commit a2f9f36.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkHadoopWriter(jobConf: JobConf)
    • class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
    • class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
    • case class Instance(w: Double, a: Vector, b: Double)
    • class DefaultSource extends RelationProvider with DataSourceRegister
    • class WeibullGenerator(
    • class IndexToString(JavaTransformer, HasInputCol, HasOutputCol):
    • class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
    • abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
    • abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42304 has finished for PR 8707 at commit 9326697.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Over at https://issues.apache.org/jira/browse/SPARK-10544, someone commented to mention that other types of built-in types do not seem to be pickleable in 1.5. For instance, here's the example that they gave:

sc.parallelize(["the red", "Fox Runs", "FAST"]).map(str.lower).count()

However, this specific example also seems to fail in 1.3.1, so I don't think that this is a regression. Just wanted to mention this discussion here to make sure you were aware of it.

@JoshRosen
Copy link
Contributor

Do you have any intuition for why this worked prior to 1.5 without the changes implemented here?

from pyspark.cloudpickle import dumps
P2 = loads(dumps(P))
p3 = P2(1, 3)
self.assertEqual(p1, p3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting: presumably P and P2 are different classes but instances created from them are still comparable for equality. Do we also need to check that those instances claim to belong to the same class? It seems way less likely that users could rely on the class comparison behavior, so probably not a huge priority to look at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These instances should become to difference classes.

@JoshRosen
Copy link
Contributor

Actually, one point of confusion: it looks like python/pyspark/serializers.py had some methods for serializing namedtuple classes with its _restore and _hack_namedtuple(cls) methods, but this patch also adds logic to cloudpickle.py to handle pickling of these instances. Do we need both of these methods? Are some only needed by certain versions of Python? It would be good to add some comments to explain some of this stuff, since it's not obvious from reading the code.

@davies
Copy link
Contributor Author

davies commented Sep 11, 2015

The HACK in serializers.py is used for cPickler, not cloudpickle.

@davies
Copy link
Contributor Author

davies commented Sep 11, 2015

Before 1.5, the old way work in CPython, but not PyPy (we don't have a unit test for it).

@davies
Copy link
Contributor Author

davies commented Sep 11, 2015

@JoshRosen BTW, this patch introduce a special case for namedtuple, it should be safe to merge into branch-1.5.

@JoshRosen
Copy link
Contributor

Empirically, this seems to work, so unless you think that we should investigate the root cause any further I'm fine with giving this an LGTM and merging to 1.5. Feel free to merge yourself, or I can do it.

@davies
Copy link
Contributor Author

davies commented Sep 15, 2015

I tried to find the root cause, but it seems hard to work in all Python versions (you can see them in the older commit), finally switch to current approach. merging this into master and 1.5 branch, thanks!

asfgit pushed a commit that referenced this pull request Sep 15, 2015
Author: Davies Liu <[email protected]>

Closes #8707 from davies/fix_namedtuple.
@asfgit asfgit closed this in 5520418 Sep 15, 2015
@coderfi
Copy link
Contributor

coderfi commented Sep 17, 2015

FYI, this works for us @ NinthDecimal. Thanks for the fix, it was a stumper!

Python 2.7.6
Spark-1.5 Branch @ 4c4a9ba28d9052fad45caca9a1eba9ef9db309d5

@ghost
Copy link

ghost commented Jun 16, 2016

Hi! I am not sure if this is related but is I look for this issue everything points me here basically. I'm getting

  ...
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 315, in save_builtin_function
    return self.save_function(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 191, in save_function
    if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'

When trying to create a data frame from an RDD:

rdd = self.sc.textFile(self.input_file_path).map(lambda line: self.process_line(line))

schema = StructType([StructField(u'Variable', StringType(), nullable=False),
                     StructField(u'Time', TimestampType(), nullable=False),
                     StructField(u'Value', FloatType(), nullable=False)])

return sql_context.createDataFrame(rdd, schema)

I am on PySpark 1.6.0 - any ideas what I'm doing wrong here?

@philastrophist
Copy link

I also get this error when using namedtuples

n = namedtuple('test', ['a', 'b'])
cloudpickle.dumps(n(1,2))

ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
Author: Davies Liu <[email protected]>

Closes apache#8707 from davies/fix_namedtuple.

(cherry picked from commit d5c0361)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants